[python] Use pypaimon with Google Cloud Storage#7769
Open
larssk wants to merge 3 commits intoapache:masterfrom
Open
[python] Use pypaimon with Google Cloud Storage#7769larssk wants to merge 3 commits intoapache:masterfrom
larssk wants to merge 3 commits intoapache:masterfrom
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Purpose
Use Google Cloud Storage (GCS) as warehouse / object storage for pypaimon (without java dependencies). This PR adds PyArrowFileIO handling for GCS.
Changes
1. Scheme dispatch (
pyarrow_file_io.py)Added a
gsbranch in__init__alongside the existings3andhdfsbranches:2.
_initialize_gcs_fs()method (pyarrow_file_io.py)Uses
pyarrow.fs.GcsFileSystem, which is already available viapyarrow[gcs].With no arguments,
GcsFileSystempicks up credentials automatically via Application Default Credentials (ADC):GOOGLE_APPLICATION_CREDENTIALS, the GCP metadata server, or Workload Identity on GKE/GCE. Three optional properties allow explicit credential passing:gcs.access-tokengcs.access-token.expirationgcs.project-id3. Path fix in
to_filesystem_path()(pyarrow_file_io.py)to_filesystem_path()had a catch-all for non-S3/non-HDFS schemes that returned onlyuri.path— e.g. forgs://my-bucket/data/tableit returned/data/table, stripping the bucket name.GcsFileSystem(likeS3FileSystem) expects paths inbucket/keyform without a leading slash.Added a
GcsFileSystembranch mirroring the existing S3 logic:4.
GcsOptionsclass (config.py)Added a
GcsOptionsclass alongside the existingS3Optionsdocumenting the three new properties.Linked issue
#7768
Tests
Unit tests —
pypaimon/tests/file_io_test.pyA new
GCSFileIOPathTestclass is added to the existingfile_io_test.py, following the same pattern as the existing S3/OSS path conversion tests. These tests require no GCS credentials.test_gcs_filesystem_typePyArrowFileIO("gs://...")produces apafs.GcsFileSysteminstancetest_gcs_path_conversiongs://bucket/keymaps tobucket/key(bucket prepended, no leading slash)test_gcs_path_bucket_onlygs://bucketwith no path component maps tobuckettest_gcs_path_normalizationgs://bucket///a///b→bucket/a/b)test_gcs_path_idempotencybucket/keypaths pass through unchangedtest_gcs_path_no_leading_slashto_filesystem_pathnever returns a path starting with/for any GCS URIIntegration tests —
pypaimon/tests/gcs_file_io_test.pyA new
GCSFileIOTestclass is added following the pattern ofoss_file_io_test.py. All tests are skipped automatically whenGCS_TEST_BUCKETis not set. Credentials are picked up via ADC — no explicit credential properties are required.test_gcs_filesystem_typePyArrowFileIOwithgs://usesGcsFileSystemtest_existsexists()returnsFalsefor non-existent paths;get_file_status()raisesFileNotFoundErrortest_write_and_read_filewrite_file()/read_file_utf8()round-triptest_write_file_overwritewrite_file(..., overwrite=False)raisesFileExistsError;overwrite=Truereplaces contenttest_new_input_stream_readnew_output_stream()/new_input_stream()binary round-trip;FileNotFoundErrorfor missing filetest_get_file_status_directoryget_file_status()returnsFileType.Directoryfor a directorytest_get_file_status_fileget_file_status()returnsFileType.Filewith non-None sizetest_delete_returns_false_when_not_existsdelete()returnsFalsefor non-existent file/directorytest_delete_non_empty_directory_raises_errordelete(..., recursive=False)raisesOSErrorfor non-empty directorytest_rename_returns_false_when_dst_existsrename()returnsFalsewhen destination already existstest_copy_filecopy_file(..., overwrite=False)raisesFileExistsError;overwrite=Truereplaces contenttest_try_to_write_atomictry_to_write_atomic()writes content and returnsTrueon successtest_mkdirs_raises_error_when_path_is_filemkdirs()raisesFileExistsErrorwhen path is an existing file